package com.hyts.stream.engine.stream.sideoutput;

import com.hyts.stream.engine.model.MetricEvent;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.util.OutputTag;

import java.util.Random;

/**
 * Desc: 使用 filter 过滤数据
 * Created by zhisheng on 2019/10/1 下午4:54
 * blog：http://www.54tianzhisheng.cn/
 * 微信公众号：zhisheng
 */
public class FilterEvent {

    private static final OutputTag<String> overFiveTag = new OutputTag<String>("machine") {
    };
    private static final OutputTag<String> equalFiveTag = new OutputTag<String>("docker") {
    };


    public static void main(String[] args) throws Exception {

        final ParameterTool params = ParameterTool.fromArgs(args);

        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.getConfig().setGlobalJobParameters(params);

        DataStreamSource<MetricEvent> data = env.addSource(new RichSourceFunction<MetricEvent>() {

            String []  str = {"machine","docker","application","middleware"};

            @Override
            public void run(SourceContext<MetricEvent> ctx) throws Exception {
                while(true) {
                    MetricEvent event = new MetricEvent();
                    Random random = new Random();
                    event.setName(str[random.nextInt(3)]);
                    ctx.collect(event);
                    System.out.println("source-------"+event.getName());
                    Thread.sleep(300);
                }
            }
            @Override
            public void cancel() {

            }
        });  //从 Kafka 获取到所有的数据流

        SingleOutputStreamOperator<MetricEvent> machineData = data.filter(m -> "machine".equals(m.getName()));  //过滤出机器的数据
        SingleOutputStreamOperator<MetricEvent> dockerData = data.filter(m -> "docker".equals(m.getName()));    //过滤出容器的数据
        SingleOutputStreamOperator<MetricEvent> applicationData = data.filter(m -> "application".equals(m.getName()));  //过滤出应用的数据
        SingleOutputStreamOperator<MetricEvent> middlewareData = data.filter(m -> "middleware".equals(m.getName()));    //过滤出中间件的数据


        machineData.print();

//        System.out.println("--------");
//
        dockerData.print();
//
//        System.out.println("--------");
//
//        applicationData.print();
//
//        System.out.println("--------");
//
//        middlewareData.print();

                //        machineData.getSideOutput(overFiveTag).print();
//
//        System.out.println("--------");
//
//        machineData.getSideOutput(equalFiveTag).print();

        env.execute();

    }
}
